Analyser loop¶
Goal¶
The goal of this algorithm is to determine the average derivatives and to predict the date (day and time) of the next actuator change for each sensor.
How it works¶
There are many steps used to calculate the date of the next actuator change step:

- Steps:
Step 0 : Get the sensor’s measure history since last actuator action
Step 1 : Transform to monotonic data
Step 2 : Remove constant values at the beginning
Step 3 : Fit a custom function to remaining data points
Step 4 : Extend the function until it crosses the next objective value
Step 5 : Get cross time
Step 6 : Compare with last data’s date
Get the sensor’s measure history since last actuator action¶
An SQL request gets all the measurements associated with a sensor since the last action of an actuator that influenced the sensor.
1 2 3 4 5 6 | SELECT value
FROM measure
WHERE
id_sensor = '{0}' AND
timeDate > '{1}'
ORDER BY timeDate ASC;
|
Transform to monotonic data¶
The algorithm must fit its function to monotonic data only.
This implies that the algorithm should discard all data recorded before the last change in the sign of the derivative.
This is achieved with a double python while loop.
Remove constant values at the beginning¶
When data is ordered in ascending order (ASC), this step removes all repeating data at the beginning. This improves the precision of the final estimation by an average of 20%.
In effect, the algorithm tries to fit a curve to a list of two-dimensional data points, removing constant measurements at the beginning improves the fit.
This functionality is integrated in the double while loop of Step 2
Fit a custom function to remaining data points¶
The algorithm now fits the following function to the remaining measurements
by changing parameters a, b, and c.
This is done with the function scipy.optimize.curve_fit
of scipy.
- Curve
If the system is far from the objective, the curve is concave and the estimation has to be a line. When the system is close to the objective, the curve is convex and the estimation is perfect.
- Initialisation
The initialisation of coefficients is crucial to find the right solution and reduce the calculation time.
a is the offset and it is initialised at the value of the last measure.
b is initialised at 1.
c is initialised between 0.1 and 1 and gradually increased by 10% each iteration.
Extend the function until it crosses the next objective value¶
Once the function has been generated, it is extended along the X axis to predict the future sensors measurements.
Step 3 returns the coefficients a, b, and c from which the best fit function f may be found.
We are therefore able to calculate the value of f for future time values and check when it will reach a given value.
Get cross time¶
Many conditions are checked in order to avoid an infinite calculation time.
A counter that can’t go over 10000 iteration.
This implies that the maximum calculation time is 24 hours.
Compare with last data’s date¶
Finally the returned value is simply the difference between the date of the intersection of the function with the objective and the last measurement’s date.
The returned value is a time in second, but the value stored in the database is a date. It allows to update the time to target on the UI without constantly making this computation.
The average derivative is also stored in order to give useful information to the user.
Examples¶
AC - Far from target |
AC - Close to target |
HE - Far from target |
HE - close to target |
Loop code¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | def loop():
import time
import db
import pubsub
import settings
import variables
import numpy as np
import math
import random
import scipy.optimize
name = __name__
Pub = pubsub.Pubsub() # Initialize communication object
Db = db.DB() # Initialize data storage object
St = settings.Settings() # Initialize variable storage object
Pub.get(name) # Initialize comminication channel __name__
time_loop = time.time() # Start time loop (To regulate max FPS)
while 1: # Infinite loop start
time.sleep(St.get("is_raspberry") * max(0, 1 / float(variables.FPS_analyser) - (time.time() - time_loop))) # Sleep time to not trigger the script more than the FPS ratio fixed
time_loop = time.time() # Start time loop (To regulate max FPS)
if Pub.get(name) or __name__ == "__main__" or not(St.get("is_raspberry")): # triggered or test from computer (from convergence.py or from this file)
sensors = Db.get_sensors_id() # Get all sensor id
for i in range(len(sensors)):
id_sensor = sensors[i]
actuators_involved = Db.get_actuator__sensors_id(id_sensor) # Get all actuators that can influence the actual loop sensor
dates = [] # Initialize list of last action of actuators
for j in range(len(actuators_involved)):
id_actuator = actuators_involved[j]
try:
dates.append(Db.get_action_date(id_actuator)) # Get the last action of the actuator
except IndexError: # If actuator never triggered
continue
if dates == []: # If no actuators involved
continue
try:
Y = Db.get_measures_date_sensor(id_sensor, max(dates)) # Get all measures of the sensor since a given date
except IndexError: # If no measures
continue
if __name__ == "__main__": # Test block for display datas on computer
Y= [25.2, 25.1, 25, 24.8, 24.6]
# print("raw",Y)
if len(Y)<=4: # If not enough datas
continue
while True: # Pic and constant data filter loop
breaker = True # breaker for the while loop
if len(Y) <= 4: # If not enough datas
break # Stop while loop
sens = 0
for j in range(len(Y) - 1):
m1 = Y[j] # measure j (before)
m2 = Y[j + 1] # measure j+1 (after)
if m1 > m2: # if there is a decrease
if sens > 0: # if there were a precedent increase
Y = Y[j:] # Data removing
breaker = False # Restart for loop with new datas
break # Break for loop
else:
sens = -1 # Set the way (decrease)
elif m1 < m2: # if there is an increase
if sens < 0: # if there were a precedent decrease
Y = Y[j:] # Data removing
breaker = False # Restart for loop with new datas
break # Break for loop
else:
sens = +1 # Set the way (decrease)
if breaker: # If monotonus datas
break # Stop while loop
# print("filtered",Y)
if len(Y) <= 4: # If monotonus datas
continue
def f(x, a, b, c): # Function to fit to datas
return a + b * x **min(1, c) # Have a look on the documentation for the justification of this function
dt = 1 / variables.FPS_sensor # Step time between two measures
X = [i * dt for i in range(len(Y))] # Generate X axis of Y datas
target = Db.get_sensor_target_delta(id_sensor) # get current target of the sensor
outer = abs(Db.get_sensor_outer(id_sensor)) # Get first hysteresis relative-to-target value
objective = target + outer * sens # Calcul the objective (value where an actuator will change of state)
average = sum(Y) / len(Y) # Calcul average
# if sens<0 and average<objective: # Double check for no coherent datas
# continue
# elif sens>0 and average>objective: # Double check for no coherent datas
# continue
precision = 10 ** -1 # power coeficient initialisation value
while True: # Fit function loop
try: # Try to fit F on (X,Y) datas
popt, pcov = scipy.optimize.curve_fit(f, X, Y,
(
Y[-1], # Initialize a coefficient (offset) with the last measure value
1, # Initialize b with 1
precision # Initialize c (power) with precision value (concave curve to line)
)
)
except Exception as e: # If no fit found
pass
breaker = False # initialisation of breaker for while loop
Xf = [1] # X generate result
Yf = [] # Y generate result
counter = 1 # counter to avoid infinite loop (if the function tend to a finite value without crossing objective value)
while True: # Extrapolation loop
try:
Yf.append(f(Xf[-1], *popt)) # Append Y value of the last X value
except:
break
if Xf[-1] > X[-1]: # If start to predict the futur
if (Yf[-1] - objective) * sens > 0: # if cross the objective line
breaker = True # Stop the prediction
break
counter *= 1.1 # Increase counter
if counter > 10 ** 5: # if there is no cross for next 24 hours
break
Xf.append(int(Xf[-1] + counter)) # Increase of one second
if breaker: # If objective has been reached
break
if precision > 1.1: # if no Fit curve found
break
precision *= 2 # Increase of the precision
if precision > 1.1: # If no fit found
continue
if __name__ == "__main__": # Test on computer block
import matplotlib.pyplot as plt
plt.scatter(X, Y) # plot measures takes for calculation
plt.plot(Xf, Yf) # plot calculation
plt.plot(Xf, [objective for i in Xf]) # plot objective
plt.show() # Display graph
if counter > 10 ** 5: # If no cross found
continue
time_to = Xf[-1] - X[-1] # Time calculation
date_change=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+time_to)) # add the time_to value
Db.set_sensor_date_change(id_sensor, date_change) # Store the change date in the database
# print("time_to",time_to)
# print("date_change",date_change)
pente = popt[1]
if sens > 0:
Db.set_sensor_derivative_up(id_sensor, pente) # Store the derivative up
elif sens < 0:
Db.set_sensor_derivative_down(id_sensor, pente) # Store the derivative down
if St.get("is_unittest") or __name__ == "__main__":
break
|